iT邦幫忙

2023 iThome 鐵人賽

DAY 4
0

Airflow 來寫第一個 DAG 吧

第一個例子

現在,我們來看一個簡單但真實的Airflow DAG 例子,並介紹其中一些常見的運算符(task)。

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

def filter_top_50_stocks(*args, **kwargs):
    # 在這個函數中,您可以從 task1 回傳的股票數據中過濾出前50大的股票
    # 並將結果存儲在 task instance 的上下文中,以便後續的 Postgres 操作使用
    stocks = kwargs['ti'].xcom_pull(key='fetch_stock_data'  # 在這裡獲取前50大股票的邏輯
	  top_50_stocks = filter_top_50() #省略實作

    kwargs['ti'].xcom_push(key='top_50_stocks', value=top_50_stocks)

default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'stock_data_etl',
    default_args=default_args,
    description='DAG to fetch and store stock data',
    schedule_interval=timedelta(days=1),  # Daily execution
    catchup=False,
)

# Task 1: Fetch stock data
fetch_stock_data = SimpleHttpOperator(
    task_id='fetch_stock_data',
    method='GET',
    http_conn_id='stock_api',  # You need to set up a connection for the API
    endpoint='/fetch_stock_data',
    dag=dag,
)

# Task 2: PythonOperator to filter top 50 stocks
filter_top_50_stocks_task = PythonOperator(
    task_id='filter_top_50_stocks',
    python_callable=filter_top_50_stocks,
    provide_context=True,
    dag=dag,
)

# Task 3: Store data in Postgres
store_in_postgres = PostgresOperator(
    task_id='store_in_postgres',
    postgres_conn_id='postgres_connection',  # You need to set up a connection for PostgreSQL
    sql='INSERT INTO stock_data (date, symbol, closing_price) SELECT date, symbol, closing_price FROM external_stock_data WHERE symbol IN ({{ ti.xcom_pull(task_ids="filter_top_50_stocks", key="top_50_stocks") }});',
    dag=dag,
)

# Task 4: Notify success
notify_success = SimpleHttpOperator(
    task_id='notify_success',
    method='POST',
    http_conn_id='notification',
    endpoint='/notify_success',
    data={"message": "Stock data ETL job completed successfully."},
    headers={"Content-Type": "application/json"},
    dag=dag,
)

# Define task dependencies
fetch_stock_data >> filter_top_50_stocks_task >> store_in_postgres >> notify_success

在這個例子中,我們使用了幾種常見的運算符(task operator):

  1. SimpleHttpOperator(簡單的HTTP操作器):用於執行HTTP請求的操作器,可以用於呼叫外部API或網站。
  2. PythonOperator(Python操作器):允許執行自定義Python函數的操作器,用於任何需要Python代碼執行的任務。
  3. PostgresOperator(Postgres操作器):用於執行PostgreSQL數據庫上的SQL操作的操作器,例如執行查詢或數據庫更改。

PostgresOperator

由於各式各樣的資料庫太多了,相關的 operator 基本上大同小異,就是提供 SQL 及 connection id 去讀取資料庫,就不一一介紹。

connection id 會跟 connection 元件有關係,這個我們後面會再提供。而我們抓下來的資料會透過 XCom 跟 SQL Template 傳遞,如同程式裡的這段:

sql='INSERT INTO stock_data (date, symbol, closing_price) SELECT date, symbol, closing_price FROM external_stock_data WHERE symbol IN ({{ ti.xcom_pull(task_ids="filter_top_50_stocks", key="top_50_stocks") }});'

兩個大括號 {{ }} 在 Airflow 內會自動被 parse,藉此取得前一個 task 傳來的值。

XCom 的用法之後也會再說明。

PythonOperator

這個可能是 Airflow 內最常用到的 operator 了,由於 python 的易用跟流行性,即使你不熟任何 Airflow 內建或是第三方寫好的 plugin (相信我,它們很好用),你幾乎都可以在 PythonOperator 內自己寫出來。

例如 SimpleHttpOperator,你當然可以在 python operator 內寫一段 python code,使用 request 呼叫 API,效果大致上是一樣的,如果你認為這兩者是綁定的工作的話。

我們下一篇再來說明 PythonOperator 的一些用法跟注意事項。


上一篇
Airflow DAG 是啥? - Day3
下一篇
Airflow PythonOperator (一) - Day5
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
obarisk
iT邦研究生 1 級 ‧ 2023-09-05 08:32:22

通知可以用 on_sucess_callback 就可以了

kaihg1028 iT邦新手 5 級 ‧ 2023-09-05 21:16:52 檢舉

是的,你說的沒錯。我這裡是為了展示常用的邏輯所以獨立拆成一個 task,但 Airflow 有提供這個接口,可以簡化我們的 DAG 邏輯。

我要留言

立即登入留言